package com.xiaohu.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;





/*
    自定义sink
 */
public class CustomSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> socketSource = env.socketTextStream("master", 7777);

        socketSource.addSink(new MySink());


        env.execute();
    }

    //TODO: 实现普通SinkFunction接口
//    public static class MySink implements SinkFunction<String>{
//        /*
//            sink的核心逻辑，写逻辑就写在这个方法里
//            注意：这个方法是来一条数据调用一次，不要在这里创建连接对象，所以一般自定义sink是继承富函数类
//         */
//        @Override
//        public void invoke(String value, Context context) throws Exception {
//            //写出逻辑：比如写到mysql
//            SinkFunction.super.invoke(value, context);
//        }
//    }

    //TODO: 继承RichXxxFunction
    public static class MySink extends RichSinkFunction<String>{

        @Override
        public void open(Configuration parameters) throws Exception {
            //任务启动，每个子任务会调用一次，在这里创建连接对象，配合状态使用
            super.open(parameters);
        }

        @Override
        public void invoke(String value, Context context) throws Exception {
            super.invoke(value, context);
        }

        @Override
        public void close() throws Exception {
            //清理，销毁对象
            super.close();
        }
    }

}
